跳到主要内容

Java 多线程-线程池之异步任务

学习一个技术最好的方式就是去使用它,写了几天游戏,在它里面写了一个事件管理器,发现效果非常不错,所以想到如何在 Java 的线程池中使用这种异步调用呢?

创建事件标识

public enum EventID {
TEST
}

创建监听者接口

public interface IEventObserver {
void handleEvent(EventData<?> resp);
}

编写一个 EventData

public class EventData<T> {
private final EventID eid;
private T data;

private EventData(EventID eid) {
this.eid = eid;
}

public void send(T data) {
this.data = data;
EventManager.getInstance().sendEvent(this);
}

public static <T> EventData<T> createEvent(EventID eid) {
return new EventData<>(eid);
}

public EventID getEid() {
return eid;
}

public T getData() {
return this.data;
}

}

EventManager 事件管理器

这里使用线程池来延迟处理事件

public class EventManager {
private final Map<EventID, List<IEventObserver>> observerList;
// 开辟一个线程池来处理任务
ExecutorService ser = Executors.newFixedThreadPool(10);

/* 注意:这里得加 volatile 防止内存重排 */
private static volatile EventManager instance = null;

private EventManager() {
observerList = new HashMap<>();
}

/**
* Java 无法像 TypeScript 或者 C# 那样方便的写泛型模板,所以还不如直接在里面写
* @return EventManager 单例
*/
public static EventManager getInstance() {
// 先判断实例是否存在,若不存在再对类对象进行加锁处理
if (instance == null) {
synchronized (EventManager.class) {
if (instance == null) {
instance = new EventManager();
}
}
}
return instance;
}

/**
* 发送事件
*/
public void sendEvent(EventData<?> eve) {
// 如果没有观察者监听这个事件则结束
if (!this.observerList.containsKey(eve.getEid())) return;

// 通知监听了这个事件的全部观察者
var observers = this.observerList.get(eve.getEid());
// 非空才执行
Optional.of(observers).ifPresent(list -> list.forEach(x -> {
if (x != null) {
// 丢到线程池里面执行
ser.execute(()-> x.handleEvent(eve));
}
}));
}

/**
* 注册一个监听者
*
* @param newObj 需要注册的监听者
* @param eid 需要监听的事件 ID
*/
private void registerObj(IEventObserver newObj, EventID eid) {
if (!this.observerList.containsKey(eid)) {
var list = new ArrayList<IEventObserver>();
list.add(newObj);
this.observerList.put(eid, list);
} else {
var list = this.observerList.get(eid);
// 不存在才要添加
if (list != null && !list.contains(newObj)) {
list.add(newObj);
}
}
}

/**
* 移除监听者
*
* @param removeObj 需要移除的监听对象
*/
public void removeObj(IEventObserver removeObj) {
this.observerList.forEach((k, v) -> v.remove(removeObj));
}

/**
* 监听者在这里注册,注意这里形参是可变参数
*
* @param newObj 需要被注册的监听者
* @param eids 需要监听的事件列表
*/
public static void register(IEventObserver newObj, EventID... eids) {
for (var eid : eids) {
EventManager.getInstance().registerObj(newObj, eid);
}
}

/**
* 移除一个监听者
* @param removeObj 需要移除的对象
*/
public static void remove(IEventObserver removeObj) {
EventManager.getInstance().removeObj(removeObj);
}
}

编写一个测试类

public class Main {

static class Test implements IEventObserver {

private final String name;

Test(String name) {
this.name = name;
EventManager.register(this, EventID.TEST);
}

@Override
public void handleEvent(EventData<?> resp) {
System.out.println(name + ":当前执行的数据是:" + resp.getData().toString() + ",执行的工作线程是:" + Thread.currentThread().getName());
}
}

public static void main(String[] args) {
// 测试创建多个对象
List<Test> tests = Arrays.asList(
new Test("张三"),
new Test("李四01"),
new Test("李四02"),
new Test("李四03"),
new Test("李四05"),
new Test("李四04")
);

/*
* 注意,这样是有问题的,因为 EventManager 处理的速度,不够这里发送的快,而且,
* 这里的 event 一直都是同一个,所以当 EventManager 处理时 EventData 里面的数据已经被改了
*/
// EventData<Object> event = EventData.createEvent(EventID.TEST);
// for (var i = 0; i < 1000; i++) {
// event.send("当前是" + i);
// }


for (var i = 0; i < 1000; i++) {
EventData.createEvent(EventID.TEST).send("当前是" + i);
}
}
}

打印结果如下: